iT邦幫忙

2019 iT 邦幫忙鐵人賽

DAY 30
0
Software Development

學習Go系列 第 30

Go day 30 (HDFS to Redis & keep going)

  • 分享至 

  • xImage
  •  

今天是鐵人賽的最後一天,最後一個主題想介紹 Go 也可以運用在 Hadoop 的 HDFS 上.所以 Go 可以運用在很多地方.

HDFS to Redis Cluster

簡單稍微介紹一下,HDFS(Hadoop Distributed File System) 是一種分散式的檔案系統.使用上跟一般的檔案系統差不多,只不過是要透過 hdfs 的指令去執行.而 Redis 是一種 in memory 的 key-value 資料庫,速度很快,而且資料量太大可以使用多台 Redis 組成 Cluster 的模式.

使用 hdfs dfs -ls 看 /user/miuser/data/model/20181030_person 底下的檔案

> hdfs dfs -ls /user/miuser/data/model/20181030_person
18/10/30 12:50:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 6 items
-rw-r--r--   2 miuser supergroup          0 2018-10-30 12:44 /user/miuser/data/model/20181030_person/_SUCCESS
-rw-r--r--   2 miuser supergroup  185979860 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00000
-rw-r--r--   2 miuser supergroup  185978526 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00001
-rw-r--r--   2 miuser supergroup  185973527 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00002
-rw-r--r--   2 miuser supergroup  185977162 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00003
-rw-r--r--   2 miuser supergroup  185986465 2018-10-30 12:44 /user/miuser/data/model/20181030_person/part-00004

使用 hdfs dfs -cat part-00004 這檔案的內容

> hdfs dfs -cat /user/miuser/data/model/20181030_person/part-00004 | head -n 1
18/10/30 12:52:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
0,0,hash:trackrapid68427621,0|||||hash:trackrapid68427621|10071,9;10058,9;10080,9;10328,9

目標就是利用 Go 把上面的 HDFS 當案 parser 後寫入 Redis Cluster.

HDFS 部分參考 HDFS for Go,API 使用可以參考 GoDoc,下載

go get github.com/colinmarc/hdfs

Redis Cluster 部分參考 go-redis.下載

go get github.com/go-redis/redis

主程式

package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/colinmarc/hdfs"
	"github.com/go-redis/redis"
)

/*
	HDFS_DIR=/user/miuser/data/model/20181024_person HDFS_NAMENODE=192.168.3.1:8020 ./enrich-backupToRedis
	hdfsDirPath := "/user/miuser/data/model/20181024_person"
	hdfsClient, _ := hdfs.New("192.168.3.1:8020")
*/

func main() {
	start := time.Now()
	fmt.Println("HDFS to Redis start Time : " + start.Format("20060102150405"))
	var client *redis.ClusterClient
	hdfsDirPath := os.Getenv("HDFS_DIR")
	hdfsNameNode := os.Getenv("HDFS_NAMENODE")
	// 給 HDFS 的 Namenode ip
	hdfsClient, _ := hdfs.New(hdfsNameNode)
	//讀取 HDFS 的目錄
	hdfsFiles, _ := hdfsClient.ReadDir(hdfsDirPath)

	//取得 redis cluster 的 client
	client = redis.NewClusterClient(&redis.ClusterOptions{
		// 給 redis cluster 的 ip (這邊用了 3 台 redis)
		Addrs:    []string{"192.168.3.1:7000", "192.168.3.2:7000", "192.168.3.3:7000"},
		Password: "test",
	})

	// 測試看看Redis 連線是否正常
	if !pingRedisCluster(client) {
		fmt.Println("Connect RedisCluster Fail ! ")
		os.Exit(1)
	}

	var exeCount = 0
	var m sync.Mutex
	var wg sync.WaitGroup
	wg.Add(len(hdfsFiles))

	for _, hdfsFile := range hdfsFiles {
		var hdfsnm string
		hdfsnm = hdfsFile.Name()

		go func() {
			defer wg.Done()
			// 檔名為 part 開頭
			if strings.HasPrefix(hdfsnm, "part") {
				hdfsFile := hdfsDirPath + "/" + hdfsnm
				//取得 HDFS 的 File
				file, _ := hdfsClient.Open(hdfsFile)
				//一行一行讀取
				scanner := bufio.NewScanner(file)
				for scanner.Scan() {
					lineData := scanner.Text()
					//將每一行資料寫入 Redis cluster
					personModelToRedis(client, lineData)
					m.Lock()
					exeCount++
					m.Unlock()
				}
			}
		}()
	}
	wg.Wait()
	end := time.Now()
	executeTime := end.Sub(start)
	fmt.Println("HDFS to Redis end Time : " + end.Format("20060102150405"))
	fmt.Printf("HDFS to Redis executeTime : %v , executeCount : %d ", executeTime, exeCount)
}

func personModelToRedis(client *redis.ClusterClient, psersonStr string) {
	//HDFS : 0,0;1;2(certset),id,last | locations | urls | kvs | profiles | ids | labels
	personModel := make(map[string]interface{})
	modelPart := strings.Split(psersonStr, "|")
	personPart := strings.Split(modelPart[0], ",")
	personID := personPart[2]
	personModel["certset"] = personPart[1]
	personModel["id"] = personID
	personModel["last"] = personPart[3]
	personModel["locations"] = modelPart[1]
	personModel["urls"] = modelPart[2]
	personModel["kvs"] = modelPart[3]
	personModel["profiles"] = modelPart[4]
	personModel["ids"] = modelPart[5]
	personModel["labels"] = modelPart[6]
	client.HMSet("usrProfile_"+personID, personModel)
}

func pingRedisCluster(client *redis.ClusterClient) bool {
	err := client.Ping().Err()
	if err == nil {
		return true
	} else {
		return false
	}
}

打包成可在 linux 上執行的執行檔,需要再 go build 前面加上參數

env GOOS=linux GOARCH=amd64 go build example1.go

給環境變數的執行方式

HDFS_DIR=/user/miuser/data/model/20181024_person HDFS_NAMENODE=192.168.3.65:8020 ./enrich-backupToRedis

使用 os.Getenv 可以在執行程式時,抓到環境變數的值.

keep going

最後整理一下這 30 天的一些主題,主要分成 go 語言的基礎與進階,接下來就是 go 的應用,每一項主題都還可以花時間再研究的更仔細.但 go 最大的優點應該是使用 goroutine 的平行處理,只要用得好 goroutine 對程式的效能來說會提升不少.

  • 基礎
1.variables
2.pointer
3.new
4.type
5.Array & slice
6.map & struct
7.function
8.methods
9.interface
10.flow control
11.select case
  • 進階
1.goroutine
2.channel
3.sync.WaitGroup & sync.Mutex
4.io
5.json
6.testing
7.benchmark
8.error handling
9.reflection
  • 應用
1.web server
2.gin-gonic
3.mongodb
4.HDFS to Redis
...

感謝~~~~~~~


上一篇
Go day 29 (error handling)
系列文
學習Go30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言